Focusing on data instead of processes helps a great deal to create robust concurrent programs. You as a programmer
define your data together with functions that should be applied to it and then let the underlying machinery to process the data.
Typically a set of concurrent tasks will be created and then they will be submitted to a thread pool for processing.In GPars the  GParsPool  and  GParsExecutorsPool  classes give you access to low-level data parallelism techniques.
While the  GParsPool  class relies on the jsr-166y Fork/Join framework and so offers greater functionality and better performance,
the  GParsExecutorsPool  uses good old Java executors and so is easier to setup in a managed or restricted environment.There are three fundamental domains covered by the GPars low-level data parallelism:
- Processing collections concurrently
- Running functions (closures) asynchronously
- Performing Fork/Join (Divide/Conquer) algorithms
Dealing with data frequently involves manipulating collections. Lists, arrays, sets, maps, iterators, strings and lot of other data types can be viewed as collections of items.
The common pattern to process such collections is to take elements sequentially, one-by-one, and make an action for each of the items in row.Take, for example, the  min()  function, which is supposed to return the smallest element of a collection. When you call the  min()  method on a collection of numbers,
the caller thread will create an  accumulator  or  so-far-the-smallest-value  initialized to the minimum value of the given type, let say to zero. And then the thread will iterate through the elements of the collection
and compare them with the value in the  accumulator  . Once all elements have been processed, the minimum value is stored in the  accumulator  .This algorithm, however simple, is totally wrong on multi-core hardware. Running the  min()  function on a dual-core chip can leverage at most 50% of the computing power of the chip.
On a quad-core it would be only 25%. Correct, this algorithm effectively wastes 75% of the computing power of the chip.Tree-like structures proved to be more appropriate for parallel processing. The  min()  function in our example doesn't need to iterate through all the elements in row and compare their values with the  accumulator  .
What it can do instead is relying on the multi-core nature of your hardware. A  parallel_min()  function could, for example, compare pairs (or tuples of certain size) of neighboring values
in the collection and promote the smallest value from the tuple into a next round of comparison. Searching for minimum in different tuples can safely happen in parallel and so tuples in the same round
can be processed by different cores at the same time without races or contention among threads.Meet Parallel Arrays
The jsr-166y library brings a very convenient abstraction called Parallel Arrays . GPars leverages the Parallel Arrays implementation
 in several ways. The GParsPool and GParsExecutorsPool classes provide parallel variants of the common Groovy iteration methods like  each()  ,  collect()  ,  findAll()  and such.
 def selfPortraits = images.findAllParallel{it.contains me}.collectParallel {it.resize()}def smallestSelfPortrait = images.parallel.filter{it.contains me}.map{it.resize()}.min{it.sizeInMB}Usage of GParsPool
The  GParsPool  class enables a ParallelArray-based (from JSR-166y) concurrency DSL for collections and objects.Examples of use://summarize numbers concurrently
 GParsPool.withPool {
     final AtomicInteger result = new AtomicInteger(0)
     [1, 2, 3, 4, 5].eachParallel {result.addAndGet(it)}
     assertEquals 15, result
 } //multiply numbers asynchronously
 GParsPool.withPool {
     final List result = [1, 2, 3, 4, 5].collectParallel {it * 2}
     assert ([2, 4, 6, 8, 10].equals(result))
 }//check whether all elements within a collection meet certain criteria
 GParsPool.withPool(5) {ForkJoinPool pool ->
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }withPool(10) {...}
withPool(20, exceptionHandler) {...}withPool {
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }
- eachParallel()
- eachWithIndexParallel()
- collectParallel()
- findAllParallel()
- findAnyParallel
- findParallel()
- everyParallel()
- anyParallel()
- grepParallel()
- groupByParallel()
- foldParallel()
- minParallel()
- maxParallel()
- sumParallel()
- splitParallel()
- countParallel()
- foldParallel()
Meta-class enhancer
As an alternative you can use the  ParallelEnhancer  class to enhance meta-classes of any classes or individual instances with the parallel methods.
import groovyx.gpars.ParallelEnhancerdef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
ParallelEnhancer.enhanceInstance(list)
println list.collectParallel {it * 2 }def animals = ['dog', 'ant', 'cat', 'whale']
ParallelEnhancer.enhanceInstance animals
println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found')
println (animals.everyParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')Exception handling
If an exception is thrown while processing any of the passed-in closures, the first exception gets re-thrown from the xxxParallel methods and the algorithm stops as soon as possible.
The exception handling mechanism of GParsPool builds on the one built into the Fork/Join framework. Since Fork/Join algorithms are by nature hierarchical,
once any part of the algorithm fails, there's usually little benefit from continuing the computation, since some branches of the algorithm will never return a result.Bear in mind that the GParsPool implementation doesn't give any guarantees about its behavior after a first unhandled exception occurs,
beyond stopping the algorithm and re-throwing the first detected exception to the caller.
This behavior, after all, is consistent with what the traditional sequential iteration methods do.
Transparently parallel collections
On top of adding new xxxParallel() methods, GPars can also let you change the semantics of the original iteration methods. For example, you may be passing a collection into a library method, which will process your collection
in a sequential way, let say using the  collect()  method. By changing the semantics of the  collect()  method on your collection you can effectively parallelize the library sequential code.GParsPool.withPool {    //The selectImportantNames() will process the name collections concurrently
    assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeConcurrent())
}/**
 * A function implemented using standard sequential collect() and findAll() methods.
 */
def selectImportantNames(names) {
    names.collect {it.toUpperCase()}.findAll{it.size() > 4}
}import static groovyx.gpars.GParsPool.withPooldef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]println 'Sequential: '
list.each { print it + ',' }
println()withPool {    println 'Sequential: '
    list.each { print it + ',' }
    println()    list.makeConcurrent()    println 'Concurrent: '
    list.each { print it + ',' }
    println()    list.makeSequential()    println 'Sequential: '
    list.each { print it + ',' }
    println()
}println 'Sequential: '
list.each { print it + ',' }
println()import static groovyx.gpars.GParsPool.withPooldef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]println 'Sequential: '
list.each { print it + ',' }
println()withPool {    println 'Sequential: '
    list.each { print it + ',' }
    println()    list.asConcurrent {
        println 'Concurrent: '
        list.each { print it + ',' }
        println()
    }    println 'Sequential: '
    list.each { print it + ',' }
    println()
}println 'Sequential: '
list.each { print it + ',' }
println()/**
 * A function implemented using standard sequential collect() and findAll() methods.
 */
def selectImportantNames(names) {
    names.collect {it.toUpperCase()}.findAll{it.size() > 4}
}def names = ['Joe', 'Alice', 'Dave', 'Jason']
ParallelEnhancer.enhanceInstance(names)
//The selectImportantNames() will process the name collections concurrently
assert ['ALICE', 'JASON'] == selectImportantNames(names.makeConcurrent())import groovyx.gpars.ParallelEnhancerdef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]println 'Sequential: '
list.each { print it + ',' }
println()ParallelEnhancer.enhanceInstance(list)println 'Sequential: '
list.each { print it + ',' }
println()list.asConcurrent {
    println 'Concurrent: '
    list.each { print it + ',' }
    println()}
list.makeSequential()println 'Sequential: '
list.each { print it + ',' }
println()Avoid side-effects in functions
We have to warn you. Since the closures that are provided to the parallel methods like  eachParallel()  or  collectParallel()  may be run in parallel, you have to make sure that each of the closures
is written in a thread-safe manner. The closures must hold no internal state, share data nor have side-effects beyond the boundaries the single element that they've been invoked on.
Violations of these rules will open the door for race conditions and deadlocks, the most severe enemies of a modern multi-core programmer.Don't do this:
def thumbnails = []
images.eachParallel {thumbnails << it.thumbnail}  //Concurrently accessing a not-thread-safe collection of thumbnails, don't do this!Usage of GParsExecutorsPool
The  GParsPool  class enables a Java Executors-based concurrency DSL for collections and objects.The  GParsExecutorsPool  class can be used as a pure-JDK-based collection parallel processor. Unlike the  GParsPool  class,  GParsExecutorsPool  doesn't require jsr-166y jar file, but leverages the standard JDK executor services to parallelize closures processing a collections or an object iteratively.
It needs to be states, however, that  GParsPool  performs typically much better than  GParsExecutorsPool  does.Examples of use:
//multiply numbers asynchronously
 GParsExecutorsPool.withPool {
     Collection<Future> result = [1, 2, 3, 4, 5].collectParallel{it * 10}
     assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result*.get()))
 } //multiply numbers asynchronously using an asynchronous closure
 GParsExecutorsPool.withPool {
     def closure={it * 10}
     def asyncClosure=closure.async()
     Collection<Future> result = [1, 2, 3, 4, 5].collect(asyncClosure)
     assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result*.get()))
 }//find an element meeting specified criteria
 GParsExecutorsPool.withPool(5) {ExecutorService service ->
     service.submit({performLongCalculation()} as Runnable)
 }withPool(10) {...}
withPool(20, threadFactory) {...}withPool {
     def result = [1, 2, 3, 4, 5].findParallel{Number number -> number > 2}
     assert result in [3, 4, 5]
 }
- eachParallel()
- eachWithIndexParallel()
- collectParallel()
- findAllParallel()
- findParallel()
- allParallel()
- anyParallel()
- grepParallel()
- groupByParallel()
Meta-class enhancer
As an alternative you can use the  GParsExecutorsPoolEnhancer  class to enhance meta-classes for any classes or individual instances with asynchronous methods.
import groovyx.gpars.GParsExecutorsPoolEnhancerdef list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
GParsExecutorsPoolEnhancer.enhanceInstance(list)
println list.collectParallel {it * 2 }def animals = ['dog', 'ant', 'cat', 'whale']
GParsExecutorsPoolEnhancer.enhanceInstance animals
println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found')
println (animals.allParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')Exception handling
If exceptions are thrown while processing any of the passed-in closures, an instance of  AsyncException  wrapping all the original exceptions gets re-thrown from the xxxParallel methods.Avoid side-effects in functions
Once again we need to warn you about using closures with side-effects effecting objects beyond the scope of the single currently processed element or closures which keep state. Don't do that! It is dangerous to pass them to any of the  xxxParallel()  methods.
The  memoize  function enables caching of function's return values. Repeated calls to the memoized function with the same argument values
will, instead of invoking the calculation encoded in the original function, retrieve the result value from an internal transparent cache.
Provided the calculation is considerably slower than retrieving a cached value from the cache, this allows users to trade-off memory for performance.
Checkout out the example, where we attempt to scan multiple websites for particular content:The memoize functionality of GPars has been contributed to Groovy in version 1.8 and if you run on Groovy 1.8 or later, it is recommended to use the Groovy functionality.
Memoize in GPars is almost identical, except that it searches the memoize caches concurrently using the surrounding thread pool and so may give
performance benefits in some scenarios.
The GPars memoize functionality has been renamed to avoid future conflicts with the memoize functionality in Groovy.
GPars now calls the methods with a preceding letter  g  , such as gmemoize().
Examples of use
GParsPool.withPool {
    def urls = ['http://www.dzone.com', 'http://www.theserverside.com', 'http://www.infoq.com']
    Closure download = {url ->
        println "Downloading $url"
        url.toURL().text.toUpperCase()
    }
    Closure cachingDownload = download.gmemoize()    println 'Groovy sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GROOVY')}
    println 'Grails sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRAILS')}
    println 'Griffon sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRIFFON')}
    println 'Gradle sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRADLE')}
    println 'Concurrency sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('CONCURRENCY')}
    println 'GPars sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GPARS')}
}Fibonacci example
A purely functional, recursive implementation, following closely the definition of Fibonacci numbers is exponentially complex:Closure fib = {n -> n > 1 ? call(n - 1) + call(n - 2) : n}Closure fib
fib = {n -> n > 1 ? fib(n - 1) + fib(n - 2) : n}.gmemoize()Available variants
memoize
The basic variant, which keeps values in the internal cache for the whole lifetime of the memoized function. Provides the best performance
characteristics of all the variants.memoizeAtMost
Allows the user to set a hard limit on number of items cached. Once the limit has been reached, all subsequently added values
will eliminate the oldest value from the cache using the LRU (Last Recently Used) strategy.So for our Fibonacci number example, we could safely reduce the cache size to two items:Closure fib
fib = {n -> n > 1 ? fib(n - 1) + fib(n - 2) : n}.memoizeAtMost(2)
- Keep the memory footprint of the cache within defined boundaries
- Preserve desired performance characteristics of the function. Too large caches may take longer to retrieve the cached value than it would have taken to calculate the result directly.
memoizeAtLeast
Allows unlimited growth of the internal cache until the JVM's garbage collector decides to step in and evict SoftReferences,
used by our implementation, from the memory. The single parameter value to the  memoizeAtLeast()  method specifies the minimum number
of cached items that should be protected from gc eviction. The cache will never shrink below the specified number of entries.
The cache ensures it only protects the most recently used items from eviction using the LRU (Last Recently Used) strategy.memoizeBetween
Combines memoizeAtLeast and memoizeAtMost and so allowing the cache to grow and shrink in the range between the two parameter values
depending on available memory and the gc activity, yet the cache size will never exceed the upper size limit
to preserve desired performance characteristics of the cache.The Parallel Collection Map/Reduce DSL gives GPars a more functional flavor. In general, the Map/Reduce DSL may be used for the same purpose as the  xxxParallel()  family methods and has very similar semantics.
On the other hand, Map/Reduce can perform considerably faster, if you need to chain multiple methods to process a single collection in multiple steps:
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
            .map {it.toURL().text.toUpperCase()}
            .filter {it.contains('GROOVY')}
            .map{it.split()}
            .map{it.findAll{word -> word.contains 'GROOVY'}.size()}
            .sum()
- map()
- reduce()
- filter()
- size()
- sum()
- min()
- max()
- sort()
- groupBy()
- combine()
Returning back to a plain Groovy collection instance is always just a matter of retrieving the  collection  property.def myNumbers = (1..1000).parallel.filter{it % 2 == 0}.map{Math.sqrt it}.collectionAvoid side-effects in functions
Once again we need to warn you. To avoid nasty surprises, please, keep your closures, which you pass to the Map/Reduce functions, stateless and clean from side-effects.Availability
This feature is only available when using in the Fork/Join-based  GParsPool  , not in  GParsExecutorsPool  .Classical Example
A classical example, inspired by http://github.com/thevery, counting occurencies of words in a string:import static groovyx.gpars.GParsPool.withPooldef words = "This is just a plain text to count words in"
print count(words)def count(arg) {
  withPool {
    return arg.parallel
      .map{[it, 1]}
      .groupBy{it[0]}.getParallel()
      .map {it.value=it.value.size();it}
      .sort{-it.value}.collection
  }
}def words = "This is just a plain text to count words in"
print count(words)def count(arg) {
  withPool {
    return arg.parallel
      .map{[it, 1]}
      .combine(0) {sum, value -> sum + value}.getParallel()
      .sort{-it.value}.collection
  }
}Combine
The  combine  operation expects on its input a list of tuples (two-element lists) considered to be key-value pairs (such as [key1, value1, key2, value2, key1, value3, key3, value4 … ] )
with potentially repeating keys. When invoked,  combine  merges the values for identical keys using the provided accumulator function and produces a map mapping the original (unique) keys to their accumulated values.
E.g. [a, b, c, d, a, e, c, f] will be combined into a : b+e, c : d+f, while the '+' operation on the values needs to be provided by the user as the accumulation closure.
The  accumulation function  argument needs to specify a function to use for combining (accumulating) the values belonging to the same key.
An  initial accumulator value  needs to be provided as well. Since the  combine  method processes items in parallel, the  initial accumulator value  will be reused multiple times.
Thus the provided value must allow for reuse. It should be either a cloneable or immutable value or a closure returning a fresh initial accumulator each time requested.
Good combinations of accumulator functions and reusable initial values include:
accumulator = {List acc, value -> acc << value} initialValue = []
accumulator = {List acc, value -> acc << value} initialValue = {-> []}
accumulator = {int sum, int value -> acc + value} initialValue = 0
accumulator = {int sum, int value -> sum + value} initialValue = {-> 0}
accumulator = {ShoppingCart cart, Item value -> cart.addItem(value)} initialValue = {-> new ShoppingCart()}groovyx.gpars.GParsPool.withPool {
    assert 15 == [1, 2, 3, 4, 5].parallelArray.reduce({a, b -> a + b} as Reducer, 0)                                        //summarize
    assert 55 == [1, 2, 3, 4, 5].parallelArray.withMapping({it ** 2} as Mapper).reduce({a, b -> a + b} as Reducer, 0)       //summarize squares
    assert 20 == [1, 2, 3, 4, 5].parallelArray.withFilter({it % 2 == 0} as Predicate)                                       //summarize squares of even numbers
            .withMapping({it ** 2} as Mapper)
            .reduce({a, b -> a + b} as Reducer, 0)    assert 'aa:bb:cc:dd:ee' == 'abcde'.parallelArray                                                                        //concatenate duplicated characters with separator
            .withMapping({it * 2} as Mapper)
            .reduce({a, b -> "$a:$b"} as Reducer, "")Usage of GParsPool and GParsExecutorsPool asynchronous processing facilities
Both  GParsPool  and  GParsExecutorsPool  provide almost identical services in this domain, although they leverage different
underlying machinery, based on which of the two classes the user chooses.Closures enhancements
The following methods are added to closures inside the  GPars(Executors)Pool.withPool()  blocks:
- async() - Creates an asynchronous variant of the supplied closure, which when invoked returns a future for the potential return value
- callAsync() - Calls a closure in a separate thread supplying the given arguments, returning a future for the potential return value,
Examples:GParsPool.withPool() {
    Closure longLastingCalculation = {calculate()}
    Closure fastCalculation = longLastingCalculation.async()  //create a new closure, which starts the original closure on a thread pool
    Future result=fastCalculation()                           //returns almost immediately
    //do stuff while calculation performs …
    println result.get()
}GParsPool.withPool() {
    /**
     * The callAsync() method is an asynchronous variant of the default call() method to invoke a closure.
     * It will return a Future for the result value.
     */
    assert 6 == {it * 2}.call(3)
    assert 6 == {it * 2}.callAsync(3).get()
}Timeouts
The  callTimeoutAsync()  methods, taking either a long value or a Duration instance, allow the user to have the calculation cancelled after a given time interval.{->
    while(true) {
        Thread.sleep 1000  //Simulate a bit of interesting calculation
        if (Thread.currentThread().isInterrupted()) break;  //We've been cancelled
    }
}.callTimeoutAsync(2000)Executor Service enhancements
The ExecutorService and jsr166y.forkjoin.ForkJoinPool class is enhanced with the << (leftShift) operator to submit tasks to the pool and return
a  Future  for the result.Example:
GParsExecutorsPool.withPool {ExecutorService executorService ->
    executorService << {println 'Inside parallel task'}
}Running functions (closures) in parallel
The  GParsPool  and  GParsExecutorsPool  classes also provide handy methods  executeAsync()  and  executeAsyncAndWait()  to easily run multiple closures asynchronously.Example:
GParsPool.withPool {
    assertEquals([10, 20], GParsPool.executeAsyncAndWait({calculateA()}, {calculateB()}))         //waits for results
    assertEquals([10, 20], GParsPool.executeAsync({calculateA()}, {calculateB()})*.get())  //returns Futures instead and doesn't wait for results to be calculated
}Functions in Groovy
We can treat Groovy closures as functions. They take arguments, do their calculation and return a value. Provided you don't let your
closures touch anything outside their scope, your closures are well-behaved pure functions. Functions that you can combine for a better good.
def sum = (0..100000).inject(0, {a, b -> a + b})def max = myNumbers.inject(0, {a, b -> a>b?a:b})Are we concurrent yet?
This all works just fine until you realize you're not utilizing the full power of your expensive hardware. The functions are plain sequential.
No parallelism in here. All but one processor core do nothing, they're idle, totally wasted.
Those paying attention would suggest to use the  Parallel Collection  techniques described earlier and they would certainly be correct.
For our scenario described here, where we process a collection, using those  parallel  methods would be the best choice.
However, we're now looking for a generic way to create and combine asynchronous functions , which would help us
not only for collection processing but mostly in other more generic cases, like the one right below.
To make things more obvious, here's an example of combining four functions, which are supposed to check whether a particular web page matches the contents of a local file.
We need to download the page, load the file, calculate hashes of both and finally compare the resulting numbers.
Closure download = {String url ->
    url.toURL().text
}Closure loadFile = {String fileName ->
    …  //load the file here
}Closure hash = {s -> s.hashCode()}.asyncFun()Closure compare = {int first, int second ->
    first == second
}def result = compare(hash(download('http://www.gpars.org')), hash(loadFile('/coolStuff/gpars/website/index.html')))
println "The result of comparison: " + resultMaking it all asynchronous
The downside of our code is that we don't leverage the independence of the  download()  and the  loadFile()  functions.
Neither we allow the two hashes to be run concurrently. They could well run in parallel, but our way to combine functions restricts any parallelism.Obviously not all of the functions can run concurrently. Some functions depend on results of others. They cannot start before the other function finishes.
We need to block them till their parameters are available. The  hash()  functions needs a string to work on. The  compare()  function needs two numbers to compare.So we can only parallelize some functions, while blocking parallelism of others. Seems like a challenging task.Things are bright in the functional world
Luckily, the dependencies between functions are already expressed implicitly in the code. There's no need for us to duplicate the dependency information.
If one functions takes parameters and the parameters need first to be calculated by another function, we implicitly have a dependency here. The  hash()  function
depends on the  loadFile()  as well as on the  download()  functions in our example.
The  inject  function in our earlier example depends on the results of the  addition  functions invoked gradually on all the elements of the collection.
However difficult it may seem at first, our task is in fact very simple. We only need to teach our functions to return  promises  of their future results. And we need to teach the other functions
to accept those  promises  as parameters so that they wait for the real values before they start their work.
And if we convince the functions to release the threads they hold while waiting for the values, we get directly to where the magic can happen.
In the good tradition of  GPars  we've made it very straightforward for you to convince any function to believe in other functions' promises. Call the  asyncFun()  function on a closure
and you're asynchronous.
withPool {
    def maxPromise = numbers.inject(0, {a, b -> a>b?a:b}.asyncFun())
    println "Look Ma, I can talk to the user while the math is being done for me!"
    println maxPromise.get()
}
The  promise  is a good old  DataflowVariable  , so you may query its status, register notification hooks or make it an input to a Dataflow algorithm.
withPool {
    def sumPromise = (0..100000).inject(0, {a, b -> a + b}.asyncFun())
    println "Are we done yet? " + sumPromise.bound
    sumPromise.whenBound {sum -> println sum}
}
The  get()  method has also a variant with a timeout parameter, if you want to avoid the risk of waiting indefinitely.
Can things go wrong?
Sure. But you'll get an exception thrown from the result promise  get()  method.try {
    sumPromise.get()
} catch (MyCalculationException e) {
    println "Guess, things are not ideal today."
}This is all fine, but what functions can be really combined?
There are no limits. Take any sequential functions you need to combine and you should be able to combine their asynchronous variants as well.Back to our initial example comparing content of a file with a web page, we simply make all the functions asynchronous by calling
the  asyncFun()  method on them and we are ready to set off.Closure download = {String url ->
        url.toURL().text
    }.asyncFun()    Closure loadFile = {String fileName ->
        …  //load the file here
    }.asyncFun()    Closure hash = {s -> s.hashCode()}.asyncFun()    Closure compare = {int first, int second ->
        first == second
    }.asyncFun()    def result = compare(hash(download('http://www.gpars.org')), hash(loadFile('/coolStuff/gpars/website/index.html')))
    println 'Allowed to do something else now'
    println "The result of comparison: " + result.get()Calling asynchronous functions from within asynchronous functions
Another very valuable characteristics of asynchronous functions is that their result promises can also be composed.import static groovyx.gpars.GParsPool.withPool  withPool {
      Closure plus = {Integer a, Integer b ->
          sleep 3000
          println 'Adding numbers'
          a + b
      }.asyncFun()      Closure multiply = {Integer a, Integer b ->
          sleep 2000
          a * b
      }.asyncFun()      Closure measureTime = {->
          sleep 3000
          4
      }.asyncFun()      Closure distance = {Integer initialDistance, Integer velocity, Integer time ->
          plus(initialDistance, multiply(velocity, time))
      }.asyncFun()      Closure chattyDistance = {Integer initialDistance, Integer velocity, Integer time ->
          println 'All parameters are now ready - starting'
          println 'About to call another asynchronous function'
          def innerResultPromise = plus(initialDistance, multiply(velocity, time))
          println 'Returning the promise for the inner calculation as my own result'
          return innerResultPromise
      }.asyncFun()      println "Distance = " + distance(100, 20, measureTime()).get() + ' m'
      println "ChattyDistance = " + chattyDistance(100, 20, measureTime()).get() + ' m'
  }Methods as asynchronous functions
Methods can be referred to as closures using the  .&  operator. These closures can then be transformed using  asyncFun  into composable asynchronous functions just like ordinary closures.class DownloadHelper {
    String download(String url) {
        url.toURL().text
    }    int scanFor(String word, String text) {
        text.findAll(word).size()
    }    String lower(s) {
        s.toLowerCase()
    }
}
//now we'll make the methods asynchronous
withPool {
    final DownloadHelper d = new DownloadHelper()
    Closure download = d.&download.asyncFun()
    Closure scanFor = d.&scanFor.asyncFun()
    Closure lower = d.&lower.asyncFun()    //asynchronous processing
    def result = scanFor('groovy', lower(download('http://www.infoq.com')))
    println 'Allowed to do something else now'
    println result.get()
}Using annotation to create asynchronous functions
Instead of calling the  asyncFun()  function, the  @AsyncFun  annotation can be used to annotate Closure-typed fields.
The fields have to be initialized in-place and the containing class needs to be instantiated withing a  withPool  block.import static groovyx.gpars.GParsPool.withPool
import groovyx.gpars.AsyncFunclass DownloadingSearch {
    @AsyncFun Closure download = {String url ->
        url.toURL().text
    }    @AsyncFun Closure scanFor = {String word, String text ->
        text.findAll(word).size()
    }    @AsyncFun Closure lower = {s -> s.toLowerCase()}    void scan() {
        def result = scanFor('groovy', lower(download('http://www.infoq.com')))  //synchronous processing
        println 'Allowed to do something else now'
        println result.get()
    }
}withPool {
    new DownloadingSearch().scan()
}Alternative pools
The  AsyncFun  annotation by default uses an instance of  GParsPool  from the wrapping withPool block. You may, however, specify the type of pool explicitly:
@AsyncFun(GParsExecutorsPoolUtil) def sum6 = {a, b -> a + b }Blocking functions through annotations
The  AsyncFun  also allows the user to specify, whether the resulting function should have blocking (true) or non-blocking (false - default) semantics.@AsyncFun(blocking = true)
def sum = {a, b -> a + b }Parallel speculations
Imagine you need to perform a task like e.g. calculate an expensive function or read data from a file, database or internet. Luckily, you know of several good ways (e.g. functions or urls)
to achieve your goal. However, they are not all equal. Although they return back the same (as far as your needs are concerned) result, they may all take different amount of time to complete
and some of them may even fail (e.g. network issues). What's worse, no-one is going to tell you which path gives you the solution first nor which paths lead to no solution at all. Shall I
run  quick sort  or  merge sort  on my list? Which url will work best? Is this service available at its primary location or should I use the backup one?GPars speculations give you the option to try all the available alternatives in parallel and so get the result from the fastest functional path, silently ignoring the slow or broken ones.This is what the  speculate()  methods on  GParsPool  and  GParsExecutorsPool()  can do.def numbers = …
def quickSort = …
def mergeSort = …
def sortedNumbers = speculate(quickSort, mergeSort)
import static groovyx.gpars.GParsPool.speculate
import static groovyx.gpars.GParsPool.withPooldef alternative1 = {
    'http://www.dzone.com/links/index.html'.toURL().text
}def alternative2 = {
    'http://www.dzone.com/'.toURL().text
}def alternative3 = {
    'http://www.dzzzzzone.com/'.toURL().text  //wrong url
}def alternative4 = {
    'http://dzone.com/'.toURL().text
}withPool(4) {
    println speculate([alternative1, alternative2, alternative3, alternative4]).contains('groovy')
}
Make sure the surrounding thread pool has enough threads to process all alternatives in parallel. The size of the pool should match
the number of closures supplied.
Alternatives using dataflow variables and streams
In cases, when stopping unsuccessful alternatives is not needed, dataflow variables or streams may be used to obtain the result value
from the winning speculation.
Please refer to the Dataflow Concurrency section of the User Guide for details on Dataflow variables and streams.
import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.taskdef alternative1 = {
    'http://www.dzone.com/links/index.html'.toURL().text
}def alternative2 = {
    'http://www.dzone.com/'.toURL().text
}def alternative3 = {
    'http://www.dzzzzzone.com/'.toURL().text  //will fail due to wrong url
}def alternative4 = {
    'http://dzone.com/'.toURL().text
}//Pick either one of the following, both will work:
final def result = new DataflowQueue()
//  final def result = new DataflowVariable()[alternative1, alternative2, alternative3, alternative4].each {code ->
    task {
        try {
            result << code()
        } catch (ignore) { }  //We deliberately ignore unsuccessful urls
    }
}println result.val.contains('groovy')The abstraction
When talking about hierarchical problems, think about quick sort, merge sort, file system or general tree navigation and such.
- Fork / Join algorithms essentially split a problem at hands into several smaller sub-problems and recursively apply the same algorithm to each of the sub-problems.
- Once the sub-problem is small enough, it is solved directly.
- The solutions of all sub-problems are combined to solve their parent problem, which in turn helps solve its own parent problem.
Check out the fancy interactive Fork/Join visualization demo ,
which will show you how threads cooperate to solve a common divide-and-conquer algorithm.
The mighty JSR-166y library solves Fork / Join orchestration pretty nicely for us, but leaves a couple of rough edges, which can hurt you, if you don't pay attention enough. You still deal
with threads, pools or synchronization barriers.The GPars abstraction convenience layer
GPars can hide the complexities of dealing with threads, pools and recursive tasks from you, yet let you leverage the powerful Fork/Join implementation in jsr166y.import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPoolwithPool() {
    println """Number of files: ${
        runForkJoin(new File("./src")) {file ->
            long count = 0
            file.eachFile {
                if (it.isDirectory()) {
                    println "Forking a child task for $it"
                    forkOffChild(it)           //fork a child task
                } else {
                    count++
                }
            }
            return count + (childrenResults.sum(0))
            //use results of children tasks to calculate and store own result
        }
    }"""
}def quicksort(numbers) {
    withPool {
        runForkJoin(0, numbers) {index, list ->
            def groups = list.groupBy {it <=> list[list.size().intdiv(2)]}
            if ((list.size() < 2) || (groups.size() == 1)) {
                return [index: index, list: list.clone()]
            }
            (-1..1).each {forkOffChild(it, groups[it] ?: [])}
            return [index: index, list: childrenResults.sort {it.index}.sum {it.list}]
        }.list
    }
}Alternative approach
Alternatively, the underlying mechanism of nested Fork/Join worker tasks can be used directly. Custom-tailored workers can
eliminate the performance overhead associated with parameter spreading imposed when using the generic workers. Also, custom
workers can be implemented in Java and so further increase the performance of the algorithm.public final class FileCounter extends AbstractForkJoinWorker<Long> {
    private final File file;    def FileCounter(final File file) {
        this.file = file
    }    @Override
    protected Long computeTask() {
        long count = 0;
        file.eachFile {
            if (it.isDirectory()) {
                println "Forking a thread for $it"
                forkOffChild(new FileCounter(it))           //fork a child task
            } else {
                count++
            }
        }
        return count + ((childrenResults)?.sum() ?: 0)  //use results of children tasks to calculate and store own result
    }
}withPool(1) {pool ->  //feel free to experiment with the number of fork/join threads in the pool
    println "Number of files: ${runForkJoin(new FileCounter(new File("..")))}"
}Fork / Join saves your resources
Fork/Join operations can be safely run with small number of threads thanks to internally using the TaskBarrier class to synchronize the threads. While a thread is blocked inside an algorithm waiting for its sub-problems to be calculated, the thread is silently returned to the pool to take on any of the available sub-problems from the task queue and process them.
Although the algorithm creates as many tasks as there are sub-directories and tasks wait for the sub-directory tasks to complete, as few as one thread is enough to keep the computation going and eventually calculate a valid result.Mergesort example
import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool/**
 * Splits a list of numbers in half
 */
def split(List<Integer> list) {
    int listSize = list.size()
    int middleIndex = listSize / 2
    def list1 = list[0..<middleIndex]
    def list2 = list[middleIndex..listSize - 1]
    return [list1, list2]
}/**
 * Merges two sorted lists into one
 */
List<Integer> merge(List<Integer> a, List<Integer> b) {
    int i = 0, j = 0
    final int newSize = a.size() + b.size()
    List<Integer> result = new ArrayList<Integer>(newSize)    while ((i < a.size()) && (j < b.size())) {
        if (a[i] <= b[j]) result << a[i++]
        else result << b[j++]
    }    if (i < a.size()) result.addAll(a[i..-1])
    else result.addAll(b[j..-1])
    return result
}final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]withPool(3) {  //feel free to experiment with the number of fork/join threads in the pool
    println """Sorted numbers: ${
        runForkJoin(numbers) {nums ->
            println "Thread ${Thread.currentThread().name[-1]}: Sorting $nums"
            switch (nums.size()) {
                case 0..1:
                    return nums                                   //store own result
                case 2:
                    if (nums[0] <= nums[1]) return nums     //store own result
                    else return nums[-1..0]                       //store own result
                default:
                    def splitList = split(nums)
                    [splitList[0], splitList[1]].each {forkOffChild it}  //fork a child task
                    return merge(* childrenResults)      //use results of children tasks to calculate and store own result
            }
        }
    }"""
}Mergesort example using a custom-tailored worker class
public final class SortWorker extends AbstractForkJoinWorker<List<Integer>> {
    private final List numbers    def SortWorker(final List<Integer> numbers) {
        this.numbers = numbers.asImmutable()
    }    /**
     * Splits a list of numbers in half
     */
    def split(List<Integer> list) {
        int listSize = list.size()
        int middleIndex = listSize / 2
        def list1 = list[0..<middleIndex]
        def list2 = list[middleIndex..listSize - 1]
        return [list1, list2]
    }    /**
     * Merges two sorted lists into one
     */
    List<Integer> merge(List<Integer> a, List<Integer> b) {
        int i = 0, j = 0
        final int newSize = a.size() + b.size()
        List<Integer> result = new ArrayList<Integer>(newSize)        while ((i < a.size()) && (j < b.size())) {
            if (a[i] <= b[j]) result << a[i++]
            else result << b[j++]
        }        if (i < a.size()) result.addAll(a[i..-1])
        else result.addAll(b[j..-1])
        return result
    }    /**
     * Sorts a small list or delegates to two children, if the list contains more than two elements.
     */
    @Override
    protected List<Integer> computeTask() {
        println "Thread ${Thread.currentThread().name[-1]}: Sorting $numbers"
        switch (numbers.size()) {
            case 0..1:
                return numbers                                   //store own result
            case 2:
                if (numbers[0] <= numbers[1]) return numbers     //store own result
                else return numbers[-1..0]                       //store own result
            default:
                def splitList = split(numbers)
                [new SortWorker(splitList[0]), new SortWorker(splitList[1])].each{forkOffChild it}  //fork a child task
                return merge(* childrenResults)      //use results of children tasks to calculate and store own result
        }
    }
}final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]withPool(1) {  //feel free to experiment with the number of fork/join threads in the pool
    println "Sorted numbers: ${runForkJoin(new SortWorker(numbers))}"
}Running child tasks directly
The  forkOffChild()  method has a sibling - the  runChildDirectly()  method, which will run the child task directly and immediately
within the current thread instead of scheduling the child task for asynchronous processing on the thread pool. Typically you'll
call _forkOffChild() on all sub-tasks but the last, which you invoke directly without the scheduling overhead.
Closure fib = {number ->
            if (number <= 2) {
                return 1
            }
            forkOffChild(number - 1)                            //  This task will run asynchronously, probably in a different thread
            final def result = runChildDirectly(number - 2)     //  This task is run directly within the current thread
            return (Integer) getChildrenResults().sum() + result
        }        withPool {
            assert 55 == runForkJoin(10, fib)
        }Availability
This feature is only available when using in the Fork/Join-based  GParsPool  , not in  GParsExecutorsPool  .